BigQuery へのデータ INSERT をトリガに、Cloud Functions を実行してみた。
こんにちは、みかみです。
沖縄もそろそろ肌寒い毎日になってきました。
やりたいこと
- BigQuery へのデータ追加を検知して、後続処理を実行したい
- BigQuery へのデータ追加の監査ログをトリガに、Cloud Functions を実行したい
図にしてみると、こんな感じです。
前提
Google Cloud SDK(gcloud
コマンド)の実行環境は準備済みであるものとします。
本エントリでは、Cloud Shell を使用しました。
Eventarc API や、本エントリで利用している BigQuery, Cloud Functions, Cloud Build などの API は有効化済みです。
また、Compute Engine サービスアカウント( [PROJECT_NUMBER][email protected]
)に、「Eventarc イベント受信者」と 「Eventarc サービスエージェント」ロールを付与済みです。
Cloud Functions をデプロイ
以下の python コードを、main.py
というファイル名で保存しました。
import functions_framework @functions_framework.cloud_event def fetch_insert(cloudevent): print(f"Event type: {cloudevent['type']}") event_data = cloudevent.data protoPayload = event_data['protoPayload'] metadata = protoPayload['metadata'] tableDataChange = metadata.get('tableDataChange', None) if tableDataChange is None: return 'OK', 200 insertedRowsCount = tableDataChange.get('insertedRowsCount', None) if insertedRowsCount is None: return 'OK', 200 resourceName = protoPayload['resourceName'] datasetId = resourceName.split('/')[3] tableId = resourceName.split('/')[5] print(f'{insertedRowsCount} rows inserted to {datasetId}.{tableId}') return 'OK', 200
イベント( Audit Log )データから metadata
を取得し、テーブルデータが変更されていない場合や、insert された行数の情報がない場合はそのまま終了。
insert 行数がある場合はロード先データセットとテーブル名を取得してログ出力します。
以下のコマンドで Cloud Functions をデプロイします。
BigQuery の InsertJob
監査ログ出力をトリガに Cloud Functions を起動するように、--trigger-event-filters
で指定しました。
gcloud functions deploy fetch-insert \ --gen2 \ --region asia-northeast1 \ --runtime python310 \ --entry-point=fetch_insert \ --trigger-event-filters="type=google.cloud.audit.log.v1.written" \ --trigger-event-filters="serviceName=bigquery.googleapis.com" \ --trigger-event-filters="methodName=google.cloud.bigquery.v2.JobService.InsertJob" \ --trigger-location=asia-northeast1
デプロイ時に、「全てのデータアクセスログが有効になってません」な WARNING が表示されましたが、
Enable all Data Access audit logs for [bigquery.googleapis.com]? (Y/n)? n WARNING: Manual enablement of Data Access audit logs may be necessary.
今回使用する InsertJob
のログ出力はデフォルトで有効になっているので、無視して( n
+ Enter
で)デプロイします。
無事、デプロイできました。
Cloud Functions を実行
BigQuery のクエリエディタから insert クエリを実行して、既存テーブルに2行のレコードを追加しました。
少し待ってから、Cloud Functions のログを確認してみます。
期待通り、BigQuery へのでデータ insert をトリガに、CLoud Functions を起動することができました。
はまったところ
権限周りで、ちょっとはまりました。。
Pub/Sub サービスアカウント
はじめ、いくら待っても Cloud Functions が起動しない。。
Eventarc の「トリガーの詳細」確認したら、権限エラーになってました。。
Pub/Sub デフォルトサービスアカウントに「サービスアカウントトークン作成者」ロールを付与して解決しました。
gcloud projects add-iam-policy-binding [PROJECT_ID] \ --member="serviceAccount:service-[PROJECT_NUMBER]@gcp-sa-pubsub.iam.gserviceaccount.com" \ --role='roles/iam.serviceAccountTokenCreator'
ドキュメントにもちゃんと書いてありますね。。
Cloud Run のサービス間認証
Pub/Sub サービスアカウントに権限追加して、これで大丈夫だろうと思ったら、今度は 403 エラー発生。。
Compute Engine サービスアカウントに、対象サービス( Cloud Functions 関数)を実行する権限を付与して解決しました。
gcloud run services add-iam-policy-binding fetch-insert \ --region=asia-northeast1 \ --member='serviceAccount:[PROJECT_NUMBER][email protected]' \ --role='roles/run.invoker'
補足:データロードの場合
load job( bq load
コマンド使用)で --autodetect
を指定して BigQuery にデータを追加した場合には、Audit Log 出力内容に追加行数の情報がなく、本エントリで使用しているサンプルコードでは insert 行数を検出することはできませんでした。(テーブル再作成になるためでしょうか?
ですが、--autodetect
指定なしの場合は、期待通り Cloud Functions で追加行数を検出することができました。
また、最近 GA になった LOAD DATA
構文を使用して GCS から BigQuery にデータをロードする場合には、query job で実行されるものの、ログ出力内容にはテーブルデータ更新情報は含まれず、こちらも本エントリサンプルコードの Cloud Functions では検出することはできませんでした。
処理に合わせてログ出力内容をご確認ください。
まとめ(所感)
はじめ、Eventarc のトリガー諸々を作成した後に Cloud Functions デプロイしないといけないのかと思っていたのですが、Cloud Functions デプロイ( gcloud functions deploy
)コマンドだけで Eventarc トリガーも Pub/Sub トピックも自動で作成してくれるので、思ったより簡単に Cloud Functions v2 デプロイできました。
Eventarc を使えるようになったことで、Cloud Functions 起動トリガが増えたことは嬉しい限りです!
BigQuery へのデータロード後に、スケジュール実行でマート作成などの後続処理を行っているケースも多いのではないかと思いますが、Eventarc 経由で Cloud Functions や Cloud Run、Workflows を実行すれば、イベントドリブンで後続処理を実行できます。 先行処理実行後すぐに後続処理を実行したい場合や、先行処理の実行結果を判定後に後続処理を実行したい場合など、スケジュール駆動よりイベント駆動の方が望ましい場合には、Eventarc 経由の実装を是非ご検討ください。
参考
- BigQuery イベントで Cloud Run アクションをトリガーする方法 | Google Cloud ブログ
- Workflows 向けの Eventarc トリガーのご紹介 | Google Cloud ブログ
- クイックスタート | Eventarc ドキュメント
- Event types supported by Eventarc | Eventarc ドキュメント
- gcloud functions deploy | Cloud SDK
- gcloud eventarc triggers create | Cloud SDK
- Eventarc トリガー | Cloud Functions ドキュメント
- GoogleCloudPlatform/eventarc-samples/gcf | GitHub